package com.gitee.dengmin.logback;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @Author dengmin
 * @Created 2020/6/8 下午4:41
 */
public class KafkaProducerFactory  implements PooledObjectFactory<KafkaProducer<String,String>> {
    private static final long serialVersionUID = 1L;

    private Properties properties = new Properties();

    public KafkaProducerFactory(String hosts){
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 20000);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    }

    public PooledObject<KafkaProducer<String,String>> makeObject() throws Exception {
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        return new DefaultPooledObject<KafkaProducer<String, String>>(producer);
    }

    public void destroyObject(PooledObject<KafkaProducer<String,String>> pooledObject) throws Exception {
        pooledObject.getObject().close();
    }

    public boolean validateObject(PooledObject pooledObject) {
        return true;
    }

    public void activateObject(PooledObject pooledObject) throws Exception {

    }

    public void passivateObject(PooledObject pooledObject) throws Exception {

    }
}
